/** * Submit the job to the cluster and wait for it to finish. * @param verbose print the progress to the user * @return true if the job succeeded * @throws IOException thrown if the communication with the * <code>JobTracker</code> is lost */ publicbooleanwaitForCompletion( boolean verbose )throws IOException, InterruptedException, ClassNotFoundException { if (state == JobState. DEFINE) { submit(); } if (verbose) { monitorAndPrintJob(); } else { // get the completion poll interval from the client. int completionPollIntervalMillis = Job. getCompletionPollInterval(cluster.getConf()); while (!isComplete()) { try { Thread. sleep(completionPollIntervalMillis); } catch (InterruptedException ie ) { } } } return isSuccessful(); }
/** * Internal method for submitting jobs to the system. * * <p>The job submission process involves: * <ol> * <li> * Checking the input and output specifications of the job. * </li> * <li> * Computing the {@link InputSplit}s for the job. * </li> * <li> * Setup the requisite accounting information for the * {@link DistributedCache} of the job, if necessary. * </li> * <li> * Copying the job's jar and configuration to the map-reduce system * directory on the distributed file -system. * </li> * <li> * Submitting the job to the <code>JobTracker</code> and optionally * monitoring it's status. * </li> * </ol></p> * @param job the configuration to submit * @param cluster the handle to the Cluster * @throws ClassNotFoundException * @throws InterruptedException * @throws IOException */ JobStatus submitJobInternal(Job job , Cluster cluster) throws ClassNotFoundException, InterruptedException, IOException {
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job LOG.debug( "Creating splits at " + jtFs.makeQualified(submitJobDir )); int maps = writeSplits( job, submitJobDir); conf.setInt(MRJobConfig. NUM_MAPS, maps ); LOG.info( "number of splits:" + maps );
// write "queue admins of the queue to which job is being submitted" // to job file. String queue = conf.get(MRJobConfig. QUEUE_NAME, JobConf. DEFAULT_QUEUE_NAME); AccessControlList acl = submitClient.getQueueAdmins(queue ); conf.set(toFullPropertyName(queue , QueueACL. ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS // as the tasks don't need this setting, actually they may break // because of it if present as the referral will point to a // different job. TokenCache.cleanUpTokenReferral(conf );
if (conf.getBoolean( MRJobConfig. JOB_TOKEN_TRACKING_IDS_ENABLED , MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED )) { // Add HDFS tracking ids ArrayList<String> trackingIds = new ArrayList<String>(); for (Token<? extends TokenIdentifier> t : job.getCredentials().getAllTokens()) { trackingIds.add(t .decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, trackingIds.toArray(new String[trackingIds.size()])); }
// Set reservation info if it exists ReservationId reservationId = job.getReservationId(); if (reservationId != null) { conf.set(MRJobConfig. RESERVATION_ID, reservationId.toString()); }
// Write job file to submit dir writeConf( conf, submitJobFile); // // Now, actually submit the job (using the submit name) // printTokens( jobId, job.getCredentials()); status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { return status ; } else { thrownew IOException("Could not launch job"); } } finally { if (status == null) { LOG.info("Cleaning up the staging area " + submitJobDir); if (jtFs != null && submitJobDir != null) jtFs.delete( submitJobDir, true );
// sort the splits into order based on size, so that the biggest // go first Arrays.sort(array , new SplitComparator()); JobSplitWriter.createSplitFiles(jobSubmitDir , conf , jobSubmitDir.getFileSystem(conf ), array ); return array. length; }
进入getsplits进入找到如下三行就是关键点代码
long minSize = Math. max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job);
long splitSize = computeSplitSize(blockSize, minSize, maxSize );
/** * Generate the list of files and make them into FileSplits. * @param job the job context * @throws IOException */ public List<InputSplit> getSplits(JobContext job)throws IOException { Stopwatch sw = new Stopwatch().start(); long minSize = Math. max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job);
// generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus( job); for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen(); if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job .getConfiguration()); blkLocations = fs .getFileBlockLocations(file, 0, length); } if (isSplitable(job , path )) { long blockSize = file .getBlockSize(); long splitSize = computeSplitSize(blockSize, minSize, maxSize );
/** * Get the minimum split size * @param job the job * @return the minimum number of bytes that can be in a split */ publicstaticlonggetMinSplitSize(JobContext job){ return job.getConfiguration().getLong( SPLIT_MINSIZE, 1L); }
<property> <name>mapreduce.input.fileinputformat.split.minsize</name> <value>0</value> <description>The minimum size chunk that map input should be split into. Note that some file formats may have minimum split sizes that take priority over this setting.</description> </property>
/** * Get the maximum split size. * @param context the job to look at. * @return the maximum number of bytes a split can include */ publicstaticlonggetMaxSplitSize(JobContext context){ return context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE ); },